# Part of Odoo. See LICENSE file for full copyright and licensing details.

from markupsafe import Markup

from odoo.addons.mail.tests.common import mail_new_test_user, MailCommon
from odoo.addons.mail.tools.discuss import Store
from odoo.exceptions import UserError
from odoo.tests.common import tagged, users, HttpCase
from odoo.tools import is_html_empty, mute_logger, formataddr


@tagged("mail_message", "post_install", "-at_install")
class TestMessageValues(MailCommon):

    @classmethod
    def setUpClass(cls):
        super(TestMessageValues, cls).setUpClass()

        cls.alias_record = cls.env['mail.test.container'].with_context(cls._test_context).create({
            'name': 'Pigs',
            'alias_name': 'pigs',
            'alias_contact': 'followers',
        })
        cls.Message = cls.env['mail.message'].with_user(cls.user_employee)

    @users('employee')
    def test_empty_message(self):
        """ Test that message is correctly considered as empty (see `_filter_empty()`).
        Message considered as empty if:
            - no body or empty body
            - AND no subtype or no subtype description
            - AND no tracking values
            - AND no attachment

        Check _update_content behavior when voiding messages (cleanup side
        records: stars, notifications).
        """
        note_subtype = self.env.ref('mail.mt_note')
        _attach_1 = self.env['ir.attachment'].with_user(self.user_employee).create({
            'name': 'Attach1',
            'datas': 'bWlncmF0aW9uIHRlc3Q=',
            'res_id': 0,
            'res_model': 'mail.compose.message',
        })
        record = self.env['mail.test.track'].create({'name': 'EmptyTesting'})
        self.flush_tracking()
        record.message_subscribe(partner_ids=self.partner_admin.ids, subtype_ids=note_subtype.ids)
        message = record.message_post(
            attachment_ids=_attach_1.ids,
            body='Test',
            message_type='comment',
            subtype_id=note_subtype.id,
        )
        message.write({'starred_partner_ids': [(4, self.partner_admin.id)]})

        # check content
        self.assertEqual(len(message.attachment_ids), 1)
        self.assertFalse(is_html_empty(message.body))
        self.assertEqual(len(message.sudo().notification_ids), 1)
        self.assertEqual(message.notified_partner_ids, self.partner_admin)
        self.assertEqual(message.starred_partner_ids, self.partner_admin)
        self.assertFalse(message.sudo().tracking_value_ids)

        # Reset body case
        record._message_update_content(
            message,
            body=Markup("<p><br /></p>"),
            attachment_ids=message.attachment_ids.ids,
        )
        self.assertTrue(is_html_empty(message.body))
        self.assertFalse(message.sudo()._filter_empty(), 'Still having attachments')

        # Subtype content
        note_subtype.sudo().write({'description': 'Very important discussions'})
        record._message_update_content(message, body="", attachment_ids=[])
        self.assertFalse(message.attachment_ids)
        self.assertEqual(message.notified_partner_ids, self.partner_admin)
        self.assertEqual(message.starred_partner_ids, self.partner_admin)
        self.assertFalse(message.sudo()._filter_empty(), 'Subtype with description')

        # Completely emptied now
        note_subtype.sudo().write({'description': ''})
        self.assertEqual(message.sudo()._filter_empty(), message)
        record._message_update_content(message.sudo(), body="", attachment_ids=[])
        self.assertEqual(message.notified_partner_ids, self.partner_admin)  # message still notified (albeit content is removed)
        self.assertEqual(message.starred_partner_ids, self.partner_admin)  # starred messages stay (albeit content is removed)

        # test tracking values
        record.write({'user_id': self.user_admin.id})
        self.flush_tracking()
        tracking_message = record.message_ids[0]
        self.assertFalse(tracking_message.attachment_ids)
        self.assertTrue(is_html_empty(tracking_message.body))
        self.assertFalse(tracking_message.subtype_id.description)
        self.assertFalse(tracking_message.sudo()._filter_empty(), 'Has tracking values')
        with self.assertRaises(UserError, msg='Tracking values prevent from updating content'):
            record._message_update_content(tracking_message, body="", attachment_ids=[])

    @mute_logger('odoo.models.unlink')
    def test_mail_message_to_store_access(self):
        """
        User that doesn't have access to a record should still be able to fetch
        the record_name inside message _to_store.
        """
        company_2 = self.env['res.company'].create({'name': 'Second Test Company'})
        record1 = self.env['mail.test.multi.company'].create({
            'name': 'Test1',
            'company_id': company_2.id,
        })
        message = record1.message_post(body='', partner_ids=[self.user_employee.partner_id.id])
        # We need to flush and invalidate the ORM cache since the record_name
        # is already cached from the creation. Otherwise it will leak inside
        # message _to_store.
        self.env.flush_all()
        self.env.invalidate_all()
        res = Store().add(message.with_user(self.user_employee)).get_result()
        self.assertEqual(res["mail.message"][0].get("record_name"), "Test1")

        record1.write({"name": "Test2"})
        self.env.flush_all()
        self.env.invalidate_all()
        res = Store().add(message.with_user(self.user_employee)).get_result()
        self.assertEqual(res["mail.message"][0].get('record_name'), 'Test2')

        # check model not inheriting from mail.thread -> should not crash
        record_nothread = self.env['mail.test.nothread'].create({'name': 'NoThread'})
        message = self.env['mail.message'].create({
            'model': record_nothread._name,
            'res_id': record_nothread.id,
        })
        formatted = Store().add(message).get_result()["mail.message"][0]
        self.assertEqual(formatted['record_name'], record_nothread.name)

    def test_records_by_message(self):
        record1 = self.env["mail.test.simple"].create({"name": "Test1"})
        record2 = self.env["mail.test.simple"].create({"name": "Test1"})
        record3 = self.env["mail.test.nothread"].create({"name": "Test2"})
        messages = self.env["mail.message"].create(
            [
                {
                    "model": record._name,
                    "res_id": record.id,
                }
                for record in [record1, record2, record3]
            ]
        )
        # methods called on batch of message
        records_by_model_name = messages._records_by_model_name()
        test_simple_records = records_by_model_name["mail.test.simple"]
        self.assertEqual(test_simple_records, record1 + record2)
        self.assertEqual(test_simple_records._prefetch_ids, tuple((record1 + record2).ids))
        test_no_thread_records = records_by_model_name["mail.test.nothread"]
        self.assertEqual(test_no_thread_records, record3)
        self.assertEqual(test_no_thread_records._prefetch_ids, tuple(record3.ids))
        record_by_message = messages._record_by_message()
        m0_records = record_by_message[messages[0]]
        self.assertEqual(m0_records, record1)
        self.assertEqual(m0_records._prefetch_ids, tuple((record1 + record2).ids))
        m1_records = record_by_message[messages[1]]
        self.assertEqual(m1_records, record2)
        self.assertEqual(m1_records._prefetch_ids, tuple((record1 + record2).ids))
        m2_records = record_by_message[messages[2]]
        self.assertEqual(m2_records, record3)
        self.assertEqual(m2_records._prefetch_ids, tuple(record3.ids))
        # methods called on individual message from a batch: prefetch from batch is kept
        records_by_model_name = next(iter(messages))._records_by_model_name()
        test_simple_records = records_by_model_name["mail.test.simple"]
        self.assertEqual(test_simple_records, record1)
        self.assertEqual(test_simple_records._prefetch_ids, tuple((record1 + record2).ids))
        record_by_message = next(iter(messages))._record_by_message()
        m0_records = record_by_message[messages[0]]
        self.assertEqual(m0_records, record1)
        self.assertEqual(m0_records._prefetch_ids, tuple((record1 + record2).ids))

    def test_mail_message_values_body_base64_image(self):
        msg = self.env['mail.message'].with_user(self.user_employee).create({
            'body': 'taratata <img src="" width="2"> <img src="" width="2">',
        })
        self.assertEqual(len(msg.attachment_ids), 1)
        self.assertEqual(
            msg.body,
            '<p>taratata <img src="/web/image/{attachment.id}?access_token={attachment.access_token}" alt="image0" width="2"> '
            '<img src="/web/image/{attachment.id}?access_token={attachment.access_token}" alt="image0" width="2"></p>'.format(attachment=msg.attachment_ids[0])
        )

    @mute_logger('odoo.models.unlink', 'odoo.addons.mail.models.models')
    @users('employee')
    def test_mail_message_values_fromto_long_name(self):
        """ Long headers may break in python if above 68 chars for certain
        DKIM verification stacks as folding is not done correctly
        (see ``_notify_get_reply_to_formatted_email`` docstring
        + commit linked to this test). """
        # name would make it blow up: keep only email
        test_record = self.env['mail.test.container'].browse(self.alias_record.ids)
        self.user_employee.write({
            'name': 'Super Long Name That People May Enter "Even with an internal quoting of stuff"'
        })
        msg = self.env['mail.message'].create({
            'model': test_record._name,
            'res_id': test_record.id
        })
        reply_to_email = f"{test_record.alias_name}@{self.alias_domain}"
        self.assertEqual(msg.reply_to, reply_to_email,
                         'Reply-To: use only email when formataddr > 68 chars')

        # whatever the record and company names, email is too long: keep only email
        test_record.write({
            'alias_name': 'Waaaay too long alias name that should make any reply-to blow the 68 characters limit',
            'name': 'Short',
        })
        sanitized_alias_name = 'waaaay-too-long-alias-name-that-should-make-any-reply-to-blow-the-68-characters-limit'
        msg = self.env['mail.message'].create({
            'model': test_record._name,
            'res_id': test_record.id
        })
        self.assertEqual(msg.reply_to, f"{sanitized_alias_name}@{self.alias_domain}",
                         'Reply-To: even a long email is ok as only formataddr is problematic')

    @users('employee')
    @mute_logger('odoo.models.unlink')
    def test_mail_message_values_fromto_no_document_values(self):
        msg = self.Message.create({
            'reply_to': 'test.reply@example.com',
            'email_from': 'test.from@example.com',
        })
        self.assertIn('-private', msg.message_id.split('@')[0], 'mail_message: message_id for a void message should be a "private" one')
        self.assertEqual(msg.reply_to, 'test.reply@example.com')
        self.assertEqual(msg.email_from, 'test.from@example.com')

    @users('employee')
    @mute_logger('odoo.models.unlink')
    def test_mail_message_values_fromto_no_document(self):
        msg = self.Message.create({})
        self.assertIn('-private', msg.message_id.split('@')[0], 'mail_message: message_id for a void message should be a "private" one')
        reply_to_name = self.user_employee.name
        reply_to_email = '%s@%s' % (self.alias_catchall, self.alias_domain)
        self.assertEqual(msg.reply_to, formataddr((reply_to_name, reply_to_email)))
        self.assertEqual(msg.email_from, formataddr((self.user_employee.name, self.user_employee.email)))

        # no alias domain -> author
        self.env.company.sudo().alias_domain_id = False
        self.assertFalse(self.env.company.catchall_email)

        msg = self.Message.create({})
        self.assertIn('-private', msg.message_id.split('@')[0], 'mail_message: message_id for a void message should be a "private" one')
        self.assertEqual(msg.reply_to, formataddr((self.user_employee.name, self.user_employee.email)))
        self.assertEqual(msg.email_from, formataddr((self.user_employee.name, self.user_employee.email)))

    @users('employee')
    @mute_logger('odoo.models.unlink')
    def test_mail_message_values_fromto_document_alias(self):
        msg = self.Message.create({
            'model': 'mail.test.container',
            'res_id': self.alias_record.id
        })
        self.assertIn('-openerp-%d-mail.test' % self.alias_record.id, msg.message_id.split('@')[0])
        reply_to_name = self.user_employee.name
        reply_to_email = '%s@%s' % (self.alias_record.alias_name, self.alias_domain)
        self.assertEqual(msg.reply_to, formataddr((reply_to_name, reply_to_email)))
        self.assertEqual(msg.email_from, formataddr((self.user_employee.name, self.user_employee.email)))

        # no alias domain, no company catchall -> author
        self.alias_record.alias_domain_id = False
        self.env.company.sudo().alias_domain_id = False
        self.assertFalse(self.env.company.catchall_email)

        msg = self.Message.create({
            'model': 'mail.test.container',
            'res_id': self.alias_record.id
        })
        self.assertIn('-openerp-%d-mail.test' % self.alias_record.id, msg.message_id.split('@')[0])
        self.assertEqual(msg.reply_to, formataddr((self.user_employee.name, self.user_employee.email)))
        self.assertEqual(msg.email_from, formataddr((self.user_employee.name, self.user_employee.email)))

        # alias wins over company, hence no catchall is not an issue
        self.alias_record.alias_domain_id = self.mail_alias_domain

        msg = self.Message.create({
            'model': 'mail.test.container',
            'res_id': self.alias_record.id
        })
        self.assertIn('-openerp-%d-mail.test' % self.alias_record.id, msg.message_id.split('@')[0])
        self.assertEqual(msg.reply_to, formataddr((reply_to_name, reply_to_email)))
        self.assertEqual(msg.email_from, formataddr((self.user_employee.name, self.user_employee.email)))

    @users('employee')
    @mute_logger('odoo.models.unlink')
    def test_mail_message_values_fromto_document_no_alias(self):
        test_record = self.env['mail.test.simple'].create({'name': 'Test', 'email_from': 'ignasse@example.com'})

        msg = self.Message.create({
            'model': 'mail.test.simple',
            'res_id': test_record.id
        })
        self.assertIn('-openerp-%d-mail.test.simple' % test_record.id, msg.message_id.split('@')[0])
        reply_to_name = self.user_employee.name
        reply_to_email = '%s@%s' % (self.alias_catchall, self.alias_domain)
        self.assertEqual(msg.reply_to, formataddr((reply_to_name, reply_to_email)))
        self.assertEqual(msg.email_from, formataddr((self.user_employee.name, self.user_employee.email)))

    @users('employee')
    @mute_logger('odoo.models.unlink')
    def test_mail_message_values_fromto_document_manual_alias(self):
        test_record = self.env['mail.test.simple'].create({'name': 'Test', 'email_from': 'ignasse@example.com'})
        alias = self.env['mail.alias'].sudo().create({
            'alias_name': 'MegaLias',
            'alias_model_id': self.env['ir.model']._get('mail.test.simple').id,
            'alias_parent_model_id': self.env['ir.model']._get('mail.test.simple').id,
            'alias_parent_thread_id': test_record.id,
        })

        msg = self.Message.create({
            'model': 'mail.test.simple',
            'res_id': test_record.id
        })

        self.assertIn('-openerp-%d-mail.test.simple' % test_record.id, msg.message_id.split('@')[0])
        reply_to_name = self.user_employee.name
        reply_to_email = '%s@%s' % (alias.alias_name, self.alias_domain)
        self.assertEqual(msg.reply_to, formataddr((reply_to_name, reply_to_email)))
        self.assertEqual(msg.email_from, formataddr((self.user_employee.name, self.user_employee.email)))

    @users('employee')
    def test_mail_message_values_fromto_reply_to_force_new(self):
        msg = self.Message.create({
            'model': 'mail.test.container',
            'res_id': self.alias_record.id,
            'reply_to_force_new': True,
        })
        self.assertIn('reply_to', msg.message_id.split('@')[0])
        self.assertNotIn('mail.test.container', msg.message_id.split('@')[0])
        self.assertNotIn('-%d-' % self.alias_record.id, msg.message_id.split('@')[0])

    def test_mail_message_values_misc(self):
        """ Test various values on mail.message, notably default values """
        msg = self.env['mail.message'].create({'model': self.alias_record._name, 'res_id': self.alias_record.id})
        self.assertEqual(msg.message_type, 'comment', 'Message should be comments by default')


@tagged("mail_message")
class TestMessageLinks(MailCommon, HttpCase):

    def test_message_link_by_employee(self):
        record = self.env['mail.test.simple'].create({'name': 'Test1'})
        thread_message = record.message_post(body='Thread Message', message_type='comment')
        deleted_message = record.message_post(body='', message_type='comment')
        self.authenticate(self.user_employee.login, self.user_employee.login)
        with self.subTest(thread_message=thread_message):
            expected_url = self.base_url() + f'/odoo/{thread_message.model}/{thread_message.res_id}?highlight_message_id={thread_message.id}'
            res = self.url_open(f'/mail/message/{thread_message.id}')
            self.assertEqual(res.url, expected_url)
            self.assertEqual(res.url, expected_url)
        with self.subTest(deleted_message=deleted_message):
            res = self.url_open(f'/mail/message/{deleted_message.id}')

@tagged("mail_message", "mail_store", "post_install", "-at_install")
class TestMessageStore(MailCommon, HttpCase):

    def test_store_data_use_display_name(self):
        test_record = self.env['mail.test.simple.unnamed'].create({'description': 'Some description'})
        user_invalid = mail_new_test_user(self.env, login='invalid', groups='base.group_portal', name='Invalid User', email='invalid email', notification_type='email')
        test_record.message_subscribe(partner_ids=user_invalid.partner_id.ids)
        self.authenticate(self.user_employee.login, self.user_employee.password)
        msg = test_record.message_post(body='Some body', author_id=self.partner_employee.id)
        # simulate failure
        self.env['mail.notification'].create({
            'author_id': msg.author_id.id,
            'mail_message_id': msg.id,
            'res_partner_id': user_invalid.partner_id.id,
            'notification_type': 'email',
            'notification_status': 'exception',
            'failure_type': 'mail_email_invalid',
        })
        res = self.make_jsonrpc_request("/mail/data", {"fetch_params": ["failures"]})
        self.assertEqual([t["display_name"] for t in res["mail.thread"]], ['Some description'])
